1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: JmsConnectionConsumer.java,v 1.2 2005/03/18 03:36:37 tanderson Exp $
44 */
45 package org.exolab.jms.client;
46
47 import javax.jms.Connection;
48 import javax.jms.ConnectionConsumer;
49 import javax.jms.Destination;
50 import javax.jms.JMSException;
51 import javax.jms.Message;
52 import javax.jms.MessageConsumer;
53 import javax.jms.MessageListener;
54 import javax.jms.ServerSession;
55 import javax.jms.ServerSessionPool;
56 import javax.jms.Session;
57 import javax.jms.Topic;
58
59 import org.apache.commons.logging.Log;
60 import org.apache.commons.logging.LogFactory;
61
62
63 /***
64 * Implementation of the <code>javax.jms.ConnectionConsumer</code> interface.
65 *
66 * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
67 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
68 * @version $Revision: 1.2 $ $Date: 2005/03/18 03:36:37 $
69 */
70 class JmsConnectionConsumer
71 implements ConnectionConsumer, MessageListener {
72
73 /***
74 * The session to receive messages via.
75 */
76 private Session _session;
77
78 /***
79 * The consumer of messages.
80 */
81 private MessageConsumer _consumer;
82
83 /***
84 * The server session pool.
85 */
86 private ServerSessionPool _pool;
87
88 /***
89 * The logger
90 */
91 private static final Log _log =
92 LogFactory.getLog(JmsConnectionConsumer.class);
93
94
95 /***
96 * Construct a new <code>JmsConnectionConsumer</code>.
97 *
98 * @param connection the connection which created this
99 * @param destination the destination to access
100 * @param pool the server session pool
101 * @param selector the message selector. May be <code>null</code>
102 * @param maxMessages the maximum number of messages that can be
103 * assigned to a server session at one time
104 * @throws JMSException if the consumer cannot be constructed
105 */
106 public JmsConnectionConsumer(Connection connection, Destination destination,
107 ServerSessionPool pool, String selector,
108 int maxMessages)
109 throws JMSException {
110 this(connection, destination, null, pool, selector, maxMessages);
111 }
112
113 /***
114 * Construct a new <code>JmsConnectionConsumer</code>.
115 *
116 * @param connection the connection which created this
117 * @param destination the destination to access
118 * @param subscriptionName the durable subscription name. May be
119 * <code>null</code>
120 * @param pool the server session pool
121 * @param selector the message selector. May be <code>null</code>
122 * @param maxMessages the maximum number of messages that can be
123 * assigned to a server session at one time
124 * @throws JMSException if the consumer cannot be constructed
125 */
126 public JmsConnectionConsumer(Connection connection, Destination destination,
127 String subscriptionName,
128 ServerSessionPool pool, String selector,
129 int maxMessages)
130 throws JMSException {
131 if (connection == null) {
132 throw new IllegalArgumentException("Argument 'connection' is null");
133 }
134 if (destination == null) {
135 throw new IllegalArgumentException(
136 "Argument 'destination' is null");
137 }
138 if (pool == null) {
139 throw new IllegalArgumentException("Argument 'pool' is null");
140 }
141 if (maxMessages <= 0) {
142 throw new IllegalArgumentException(
143 "Argument 'maxMessages' must be > 0");
144 }
145
146 _pool = pool;
147
148 _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
149 if (subscriptionName == null) {
150 _consumer = _session.createConsumer(destination, selector, false);
151 } else {
152 _consumer = _session.createDurableSubscriber((Topic) destination,
153 subscriptionName,
154 selector, false);
155 }
156
157 _consumer.setMessageListener(this);
158 }
159
160 /***
161 * Returns the server session pool associated with this connection consumer.
162 *
163 * @return the server session pool used by this connection consumer
164 */
165 public ServerSessionPool getServerSessionPool() {
166 return _pool;
167 }
168
169 /***
170 * Close the connection consumer, freeing any allocated resources.
171 *
172 * @throws JMSException if the consumer cannot be closed
173 */
174 public void close() throws JMSException {
175 try {
176 _consumer.close();
177 _session.close();
178 } finally {
179 _pool = null;
180 _consumer = null;
181 _session = null;
182 }
183 }
184
185 /***
186 * Impmentation of MessageListener.onMessage, to receive messages
187 * from the server. In this most simple case, it loads each message into a
188 * server session and calls the start method.
189 *
190 * @param message the message
191 */
192 public void onMessage(Message message) {
193 try {
194
195
196 ServerSession serverSession = _pool.getServerSession();
197 JmsSession session = (JmsSession) serverSession.getSession();
198 message.acknowledge();
199 session.addMessage(message);
200 serverSession.start();
201 } catch (Exception exception) {
202 _log.error(exception, exception);
203 }
204 }
205
206 }